package com.behavioranalysis.flinkprogram.flink.ad;

import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.Set;

/**
 * <h3>flinkprogram</h3>
 * <p>${description}</p>
 * Created by yang on 20-2-14 下午8:49
 * updated by yang on 20-2-14 下午8:49
 */
public class DistinctUseridProcess extends ProcessAllWindowFunction<Long, Long, TimeWindow> {
    private static final long serialVersionUID = 1L;

    @Override
    public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
        // 这个方法会进来此窗口中全部的black list userid,然后进行去重,最后addSink
        // 没问题！就在这里去重，然后addSink到Mysql
        Set<Long> distinctUserid = new HashSet<>();
        distinctUserid.addAll(IteratorUtils.toList(elements.iterator()));
        distinctUserid.forEach(out::collect);
    }
}
